-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix realtime ingestion when an entire batch of messages is filtered out #7927
Fix realtime ingestion when an entire batch of messages is filtered out #7927
Conversation
.../src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
} catch (PermanentConsumerException e) { | ||
_segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e); | ||
throw e; | ||
} catch (Exception e) { | ||
// all exceptions but PermanentConsumerException are handled the same way | ||
// can be a TimeoutException or TransientConsumerException routinely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapsed identical catch blocks for the sake of the person reading the code.
} else if (messageBatch.getUnfilteredMessageCount() > 0) { | ||
// we consumed something from the stream but filtered all the content out, | ||
// so we need to advance the offsets to avoid getting stuck | ||
_currentOffset = messageBatch.getLastOffset(); | ||
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the bug fix, this ensures that we advance after consuming a bad batch.
} | ||
updateCurrentDocumentCountMetrics(); | ||
if (streamMessageCount != 0) { | ||
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, | ||
streamMessageCount, _currentOffset); | ||
} else { | ||
} else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevents unnecessary latency when there has been a bad batch, there is probably data waiting to be consumed.
@@ -63,6 +63,11 @@ | |||
private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); | |||
private final long _startTime = System.currentTimeMillis(); | |||
|
|||
@Override | |||
protected boolean injectTombstones() { | |||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set this to false to make the test pass at eb6800da96a44e6f5125097cc99a368c2f8f8847
throws IOException { | ||
super.close(); | ||
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); | ||
List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that a list was being materialised anyway in KafkaMessageBatch
, it's just easier to do it here because we can also capture the last offset. This is likely more efficient than using Iterables.filter
anyway.
@@ -40,7 +40,7 @@ | |||
* versions of the stream implementation | |||
*/ | |||
@InterfaceStability.Evolving | |||
public interface StreamPartitionMsgOffset extends Comparable { | |||
public interface StreamPartitionMsgOffset extends Comparable<StreamPartitionMsgOffset> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This caused a lot of warnings which make code harder to read, and helps the compiler to prevent heap pollution bugs.
* @return number of messages returned from the stream | ||
*/ | ||
default int getUnfilteredMessageCount() { | ||
return getMessageCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented for Kafka 2.0 only.
.../src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
0430bc6
to
d9c590e
Compare
Codecov Report
@@ Coverage Diff @@
## master #7927 +/- ##
============================================
+ Coverage 71.08% 71.11% +0.03%
- Complexity 4109 4116 +7
============================================
Files 1593 1593
Lines 82373 82379 +6
Branches 12269 12274 +5
============================================
+ Hits 58555 58586 +31
+ Misses 19867 19852 -15
+ Partials 3951 3941 -10
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Thanks for adding the test
.../src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
Outdated
Show resolved
Hide resolved
3e3184b
to
5214304
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with a few minor suggestions.
.../pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
Show resolved
Hide resolved
...ka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
Show resolved
Hide resolved
On a separate note, I saw that originally there were some optimizations piggy-backed to this PR. We should avoid doing that. Each PR should only focus on one feature, one bug fix, etc. Any optimization or refactoring should go in a separate PR. That might take a little longer for the author, but it benefits all of us in the long run. |
There were no optimisations in this PR, there were some fixes for concurrency bugs (e.g. it's incorrect to use an increment operator on a volatile variable) so I'm not sure what you're referring to.
I'm not sure this is the best place to be discussing this but you are implying that the change you mentioned was unrelated to the PR it was made in, but it wasn't. The purpose of the change, as has been discussed, is that the particular class creates very large buffers. It was included in that PR because its use for MV BYTES columns exacerbates this by multiplying what is already an overestimate of the buffer size by the maximum number of elements in the column. So the change was a mitigation to the worst case made worse by that PR.
Looking at commits to figure out what changed isn't an efficient diagnostic technique. Had you instead used a profiler you would have found the process was spending a large amount of time in the syscalls |
104a3bf
to
29e5d20
Compare
@richardstartin I don't want to get to details here. My point was that one PR should focus on one thing, not more! |
eb6800da96a44e6f5125097cc99a368c2f8f8847 creates conditions where
LLRealtimeSegmentDataManager
cannot advance if a message batch consumed from Kafka is entirely filtered out, e.g. because it only contains tombstones.LLCRealtimeClusterIntegrationTest
hangs on this commit.01f9f16a18b188bd3e15408ad945eda9541caf75 allows offsets to be committed when the entire batch was filtered out. It also fixes some of the myriad warnings and concurrency bugs in
LLRealtimeSegmentDataManager
.